/**
 * Copyright @2019 Josin All Rights Reserved.
 * Author: Josin
 * Email : xeapplee@gmail.com
 *
 * Hello, every body, for use this mysqlbinlog components
 * to sync data to other DB or NoSQL engine.
 *
 * Here are some manual for your to get start the routine
 *
 * In regular, the first step is to register a slave to the master
 * then Send COM_BINLOG_DUMP or COM_BINDLOG_DUMP_GTID
 * if ok, the mysql server will send an ok response, then you will receive
 * mysql packets from the master
 * You are allowed to copy & view the source code, but when modify or to do some
 * other job, you need to provide an PR to the source, Thank you.
 * Git:
 *      https://gitee.com/josinli/mysqlbinlog2rabbitmq
 */

#ifndef MYSQL_BINLOG_MYSQL_BINLOG_H
#define MYSQL_BINLOG_MYSQL_BINLOG_H

#include <sys/uio.h>
#include <mysql.h>
#include <fc_list.h>
#include <fc_string.h>
#include <fc_config.h>
#include <mb_socket.h>
#include <base64.h>
#include <pthread.h>

/**
 * @brief NOTICE
 * This structure used for the Table Schema field only
 */
typedef struct _LIST_DATA
{
    int       id; /* Used for Column count */
    long     val; /* Used for table ID */
    int      ity; /* Used for Column type */
    int      ile; /* Column length occupy. */
    CSTRING *key; /* The field name */
    int      precision; /* If type is decimal, This is the precison */
    int      scale;     /* If type is decimal, This is the decimal scale */
    void    *pva; /* Not used now, keep reserved. */
    void    *nva; /* Not used now, keep reserved. */
} LIST_DATA;

/**
 * @brief NOTICE
 * Record the FORMART_DESCRIPTION_EVENT response
 * store it and be next use
 * after the next FORMAT_DESCRIPTION_EVENT coming, destroy it.
 */
struct _FORMAT_DESCRIPTION_DATA
{
    char      nes;              /* Next events size */
    char     *eventsLengths;    /* It's length must be dynamic allocated. */
} FORMAT_description_DATA;

enum /* E_RMQ_TYPE */
{
    E_RMQ_UPDATE = 1, E_RMQ_INSERT = 2, E_RMQ_DELETE = 3, E_RMQ_SQL_QUERY = 4
};

/**
 * @brief NOTICE
 * This struture used for the RMQ publish list
 * to ensure the message will delivery to RMQ successfully
 * Post body was JSON
 * for the format:
 * {
 *      etype: 1, // UPDATE event
 *      data: {
 *          pre: {
 *              "name": "Josin",
 *              "age" : 25
 *          },
 *          new: {
 *              "name": "Josin",
 *              "age" : 26
 *          }
 *      }
 * }
 *
 * {
 *      etype: 2, // INSERT
 *      data: {
 *          "id": 1,
 *          "name": "Josin",
 *          "addr": "China"
 *      }
 * }
 *
 * {
 *      etype: 3, // DELETE
 *      data {
 *          "id": 1,
 *          "name": "Josin",
 *          "addr": "China"
 *      }
 * }
 *
 * {
 *      etype: 4, // SQL
 *      data: "ALTER TABLE `books`.`test_at` MODIFY COLUMN `cc` datetime(0) NULL DEFAULT NULL AFTER `bb`"
 * }
 */
typedef struct _RMQ_LIST_DATA
{
    char *response; /* The JSON encode result */
    int   status;   /* default 0, 1 means has dealed. */
} RMQ_NODE;


enum LOG_EVENT_TYPE
{
/**
  Every time you update this enum (when you add a type), you have to
  fix Format_description_event::Format_description_event().
*/
    UNKNOWN_EVENT= 0,
    START_EVENT_V3= 1,
    QUERY_EVENT= 2,
    STOP_EVENT= 3,
    ROTATE_EVENT= 4,
    INTVAR_EVENT= 5,
    LOAD_EVENT= 6,
    SLAVE_EVENT= 7,
    CREATE_FILE_EVENT= 8,
    APPEND_BLOCK_EVENT= 9,
    EXEC_LOAD_EVENT= 10,
    DELETE_FILE_EVENT= 11,
/**
  NEW_LOAD_EVENT is like LOAD_EVENT except that it has a longer
  sql_ex, allowing multibyte TERMINATED BY etc; both types share the
  same class (Load_event)
*/
    NEW_LOAD_EVENT= 12,
    RAND_EVENT= 13,
    USER_VAR_EVENT= 14,
    FORMAT_DESCRIPTION_EVENT= 15,
    XID_EVENT= 16,
    BEGIN_LOAD_QUERY_EVENT= 17,
    EXECUTE_LOAD_QUERY_EVENT= 18,

    TABLE_MAP_EVENT = 19,

/**
  The PRE_GA event numbers were used for 5.1.0 to 5.1.15 and are
  therefore obsolete.
 */
    PRE_GA_WRITE_ROWS_EVENT = 20,
    PRE_GA_UPDATE_ROWS_EVENT = 21,
    PRE_GA_DELETE_ROWS_EVENT = 22,

/**
  The V1 event numbers are used from 5.1.16 until mysql-trunk-xx
*/
    WRITE_ROWS_EVENT_V1 = 23,
    UPDATE_ROWS_EVENT_V1 = 24,
    DELETE_ROWS_EVENT_V1 = 25,

/**
  Something out of the ordinary happened on the master
 */
    INCIDENT_EVENT= 26,

/**
  Heartbeat event to be send by master at its idle time
  to ensure master's online status to slave
*/
    HEARTBEAT_LOG_EVENT= 27,

/**
  In some situations, it is necessary to send over ignorable
  data to the slave: data that a slave can handle in case there
  is code for handling it, but which can be ignored if it is not
  recognized.
*/
    IGNORABLE_LOG_EVENT= 28,
    ROWS_QUERY_LOG_EVENT= 29,

/** Version 2 of the Row events */
    WRITE_ROWS_EVENT = 30,
    UPDATE_ROWS_EVENT = 31,
    DELETE_ROWS_EVENT = 32,

    GTID_LOG_EVENT= 33,
    ANONYMOUS_GTID_LOG_EVENT= 34,
    
    PREVIOUS_GTIDS_LOG_EVENT= 35,
    
    TRANSACTION_CONTEXT_EVENT= 36,
    
    VIEW_CHANGE_EVENT= 37,

/* Prepared XA transaction terminal event similar to Xid */
    XA_PREPARE_LOG_EVENT= 38,
/**
  Add new events here - right above this comment!
  Existing events (except ENUM_END_EVENT) should never change their numbers
*/
    ENUM_END_EVENT /* end marker */
};

#define GLOBAL_STATIC_VARS static
#define GLOBAL_VARS
GLOBAL_STATIC_VARS unsigned int sequence_id = 0;

/**
 * @brief NOTICE
 * Each MySQL Request packet will need to contain this first
 */
typedef struct _MYSQL_PACKET_HEADER {
    unsigned plen: 24;
    unsigned psid: 8;
} MYSQL_PACKET_HEADER;

/**
 * @brief NOTICE
 * In Table Map event, to create, after the next table map to delete
 */
#define MYSQL_TYPE_BINARY -2
static int   MySQL_TYPE_LEN[] = {
    5, /* 0 DECIMAL Not Sure */
    1, /* 1 TINY */
    2, /* 2 SHORT */
    4, /* 3 LONG */
    4, /* 4 FLOAT */
    8, /* 5 DOUBLE */
    1, /* 6 NULL Not surce*/
    4, /* 7 TIMESTAMP */
    8, /* 8 LONG LONG */
    3, /* 9 INT24 */
    3, /* 10 DATE         year = (time & ((1 << 15) - 1) << 9) >> 9
        month = (time & ((1 << 4) - 1) << 5) >> 5
        day = (time & ((1 << 5) - 1))  Not Sure */
    3, /* 11 TIME
            hours=int(time / 10000),
            minutes=int((time % 10000) / 100),
            seconds=int(time % 100)) Not Sure */
    8, /* 12 DATETIME Not Sure */
    1, /* 13 YEAR + 1900 */
    0, /* 14 NEWDATE Not Sure */
    2, /* 15 VARCHAR May be 1 when less than 255 */
    1, /* 16 BIT NOT SURE */
    4, /* 17 TIMESTAMP2 */
    5, /* 18 DATETIME2 40 bytes,         1 bit  sign           (1= non-negative, 0= negative)
        17 bits year*13+month  (year 0-9999, month 0-12)
         5 bits day            (0-31)
         5 bits hour           (0-23)
         6 bits minute         (0-59)
         6 bits second         (0-59) */
    3, /* 19 TIME2         1 bit sign    (1= non-negative, 0= negative)
         1 bit unused  (reserved for future extensions)
        10 bits hour   (0-838)
         6 bits minute (0-59)
         6 bits second (0-59)
        ---------------------
        24 bits = 3 bytes */
};

static char *MySQL_TYPES[] = {
    "MYSQL_TYPE_DECIMAL",
    "MYSQL_TYPE_TINY",
    "MYSQL_TYPE_SHORT",
    "MYSQL_TYPE_LONG",
    "MYSQL_TYPE_FLOAT",
    "MYSQL_TYPE_DOUBLE",
    "MYSQL_TYPE_NULL",
    "MYSQL_TYPE_TIMESTAMP",
    "MYSQL_TYPE_LONGLONG",
    "MYSQL_TYPE_INT24",
    "MYSQL_TYPE_DATE",
    "MYSQL_TYPE_TIME",
    "MYSQL_TYPE_DATETIME",
    "MYSQL_TYPE_YEAR",
    "MYSQL_TYPE_NEWDATE",
    "MYSQL_TYPE_VARCHAR",
    "MYSQL_TYPE_BIT",
    "MYSQL_TYPE_TIMESTAMP2",
    "MYSQL_TYPE_DATETIME2",
    "MYSQL_TYPE_TIME2"
};
static FCL_LIST *tableMaps = NULL;
static int  ReadyJob = 0;
/**
 * @brief NOTICE
 * The XML file for the kernel config
 */
#define CONF_FILE "/etc/mbinlogmq/binlog.xml"

/**
 * @brief NOTICE
 * Some data structure to turn char to int or long
 * It's global vars, so if you want to use, please memset zero
 * first, or you will meet some problems
 */
union {
    unsigned char fchar[4];
    unsigned long flong:32;
} fchar2long;

union {
    unsigned char bit6:6;
} bit6;

union {
    unsigned char bit5:5;
} bit5;

union {
    unsigned char wchar[2];
    unsigned int  wint:16;
} wchar2int;

union {
    unsigned char tchar[3];
    unsigned int  tint:24;
} tchar2int;

union {
    unsigned char fchar5[5];
    unsigned long fflong:40;
} ffchar2long;

union {
    unsigned char schar[6];
    unsigned long slong:48;
} schar2long;

union {
    unsigned char echar[8];
    unsigned long elong: 64;
} echar2long;

/**
 * @brief NOTICE
 * Binlog header
 * Every binlog packet have the following structure
 */
typedef struct _BINLOG_HEADER{
    unsigned char eof;
    unsigned char timestamp[4];
    unsigned char event_type;
    unsigned char server_fd[4];
    unsigned char event_size[4];
    unsigned char log_pos[4];
    unsigned char flags[2];
} BINLOG_HEADER;

/**
 * @brief NOTICE
 * Each mysql response packet contains the following structure
 */
typedef struct _MYSQL_RESPONSE_PAKCET {
    MYSQL_PACKET_HEADER mph;
    unsigned header : 8; /* [00] or [fe] the OK packet header, [ff] Error packet */
    union {
        unsigned int error_code : 16;
        struct {
            unsigned char affected_rows: 8;
            unsigned char last_insert_id: 8;
        } vv;
    } v;
} MYSQL_RESPONSE_PACKET;

/**
 * @brief NOTICE
 * Below are some events headers for the Event special header
 * 1. QUERY_EVENT
 */
typedef struct _QUERY_EVENT_HEADER
{
    unsigned char slave_proxy_id[4];
    unsigned char execution_time[4];
    unsigned char schema_length;
    unsigned char error_code[2];
    unsigned char status_vars_length[2];
    /**
    * @brief Below are the
    * 1. status_vars
    * 2. schema
    * 3. [00]
    * 4 query string[EOF]
    */
} QUERY_EVENT_HEADER;

RMQ_NODE *new_rmq_node(char *);
LIST_DATA *new_list_data();
int trash_rmq_node(void *ptr);
int trash_list_data(void *ptr);
LIST_DATA *new_list_find_id(FCL_LIST *list, int id);
LIST_DATA *new_list_find_val(FCL_LIST *list, long val);
#define new_list_append(f, v) fcl_list_append(f, v, trash_list_data)
#define new_list_push(f, v) fcl_list_push(f, v, trash_list_data)
int mb_get_mysql_type_len(int type);

/**
 * @brief NOTICE
 * Send query to server
 */
int mb_query(int mysql_fd, char *query, unsigned long query_len);
MYSQL_RES *mb_query_table_field_length(char *table, unsigned long table_len, char *schema);
/**
 * @brief NOTICE
 *  COM_REGISTER_SLAVE
 * mysql packet
 */
int mb_register_slave(int mysql_fd, unsigned long master_id, unsigned long server_id,
    char *slave_hostname, unsigned char slave_hostname_length,
    char *slave_user, unsigned char slave_user_len,
    char *slave_password, unsigned char slave_password_len, unsigned int slave_port);

/**
 * @brief NOTICE
 * Sent COM_BINLOG_DUMP request to the master
 */
int mb_binlog_dump(int mysql_fd, unsigned long binlog_pos,
    unsigned long server_fd, char *binlog_filename, unsigned int binlog_filename_len);
void mb_parse_binlog(int mysql_fd);
void mb_begin_slave();

#endif /* MYSQL_BINLOG_MYSQL_BINLOG_H */
